/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

package org.unidata.mdm.data.service.segments.relations.upsert;

import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Objects;
import java.util.UUID;

import org.apache.commons.lang3.BooleanUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.service.MetaModelService;
import org.unidata.mdm.core.type.data.RecordStatus;
import org.unidata.mdm.core.type.keys.ReferenceAliasKey;
import org.unidata.mdm.core.type.model.AttributeElement;
import org.unidata.mdm.core.type.model.RelationElement;
import org.unidata.mdm.core.type.timeline.Timeline;
import org.unidata.mdm.core.util.SecurityUtils;
import org.unidata.mdm.data.context.UpsertRelationRequestContext;
import org.unidata.mdm.data.context.UpsertRelationRequestContext.UpsertRelationHint;
import org.unidata.mdm.data.dto.UpsertRelationDTO;
import org.unidata.mdm.data.exception.DataExceptionIds;
import org.unidata.mdm.data.exception.DataProcessingException;
import org.unidata.mdm.data.module.DataModule;
import org.unidata.mdm.data.po.data.RelationEtalonPO;
import org.unidata.mdm.data.po.data.RelationOriginPO;
import org.unidata.mdm.data.po.keys.RelationExternalKeyPO;
import org.unidata.mdm.data.service.DataRecordsService;
import org.unidata.mdm.data.service.impl.CommonRecordsComponent;
import org.unidata.mdm.data.service.impl.CommonRelationsComponent;
import org.unidata.mdm.data.service.impl.RelationDraftProviderComponent;
import org.unidata.mdm.data.service.segments.ContainmentRelationSupport;
import org.unidata.mdm.data.type.apply.RelationUpsertChangeSet;
import org.unidata.mdm.data.type.data.OriginRelation;
import org.unidata.mdm.data.type.data.RelationType;
import org.unidata.mdm.data.type.data.UpsertAction;
import org.unidata.mdm.data.type.keys.RecordEtalonKey;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.data.type.keys.RecordOriginKey;
import org.unidata.mdm.data.type.keys.RelationEtalonKey;
import org.unidata.mdm.data.type.keys.RelationKeys;
import org.unidata.mdm.data.type.keys.RelationOriginKey;
import org.unidata.mdm.data.type.timeline.RelationTimeline;
import org.unidata.mdm.data.util.RecordFactoryUtils;
import org.unidata.mdm.data.util.StorageUtils;
import org.unidata.mdm.draft.service.DraftService;
import org.unidata.mdm.meta.configuration.Descriptors;
import org.unidata.mdm.meta.type.search.EntityIndexType;
import org.unidata.mdm.meta.type.search.RecordHeaderField;
import org.unidata.mdm.search.context.SearchRequestContext;
import org.unidata.mdm.search.dto.SearchResultDTO;
import org.unidata.mdm.search.dto.SearchResultHitFieldDTO;
import org.unidata.mdm.search.service.SearchService;
import org.unidata.mdm.search.type.form.FieldsGroup;
import org.unidata.mdm.search.type.form.FormField;
import org.unidata.mdm.search.type.query.SearchQuery;
import org.unidata.mdm.system.exception.PlatformBusinessException;
import org.unidata.mdm.system.type.pipeline.Start;
import org.unidata.mdm.system.type.runtime.MeasurementPoint;

/**
 * @author Mikhail Mikhailov on Nov 24, 2019
 */
@Component(RelationUpsertStartExecutor.SEGMENT_ID)
public class RelationUpsertStartExecutor extends Start<UpsertRelationRequestContext, UpsertRelationDTO> implements ContainmentRelationSupport {
    /**
     * The logger.
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(RelationUpsertStartExecutor.class);
    /**
     * This segment ID.
     */
    public static final String SEGMENT_ID = DataModule.MODULE_ID + "[RELATION_UPSERT_START]";
    /**
     * Localized message code.
     */
    public static final String SEGMENT_DESCRIPTION = DataModule.MODULE_ID + ".relation.upsert.start.description";
    /**
     * Common records component.
     */
    @Autowired
    private CommonRecordsComponent commonRecordsComponent;
    /**
     * Common rel component.
     */
    @Autowired
    private CommonRelationsComponent commonRelationsComponent;
    /**
     * The DRS.
     */
    @Autowired
    private DataRecordsService dataRecordsService;
    /**
     * MMS.
     */
    @Autowired
    private MetaModelService metaModelService;
    /**
     * The search service.
     */
    @Autowired
    private SearchService searchService;
    /**
     * DS.
     */
    @Autowired
    private DraftService draftService;
    /**
     * RDPC.
     */
    @Autowired
    private RelationDraftProviderComponent relationDraftProviderComponent;
    /**
     * Constructor.
     */
    public RelationUpsertStartExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION, UpsertRelationRequestContext.class, UpsertRelationDTO.class);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void start(UpsertRelationRequestContext ctx) {
        setup(ctx);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public String subject(UpsertRelationRequestContext ctx) {
        setup(ctx);
        RelationKeys keys = ctx.relationKeys();
        return keys.getRelationName();
    }

    /**
     * {@inheritDoc}
     */
    protected void setup(UpsertRelationRequestContext ctx) {

        if (ctx.setUp()) {
            return;
        }

        // Resolve pointer refs (Old API)
        setupPointers(ctx);

        // Possibly setup change set
        setupFieldsBeforeKeys(ctx);

        // Containments I.
        setupContainmentBeforeKeys(ctx);

        // Load timeline
        setupTimeline(ctx);

        // Verify
        setupVerify(ctx);

        // Fetch or create timeline and keys
        setupKeys(ctx);

        // Containments II.
        setupContainmentAfterKeys(ctx);

        // Other fields
        setupFieldsAfterKeys(ctx);

        ctx.setUp(true);
    }

    protected void setupPointers(UpsertRelationRequestContext ctx) {

        // skip if relation is not exist or if it is contains
        if (ctx.relationType() != RelationType.REFERENCES && ctx.relationType() != RelationType.MANY_TO_MANY) {
            return;
        }

        ReferenceAliasKey referenceResolver = ctx.getReferenceAliasKey();

        // skip if we doesn't have all necessary information about alias key
        if (referenceResolver == null || referenceResolver.getValue() == null || referenceResolver.getEntityAttributeName() == null) {
            return;
        }

        String aliasAttrName = referenceResolver.getEntityAttributeName();
        RelationElement def = metaModelService.instance(Descriptors.DATA).getRelation(ctx.relationName());
        AttributeElement attrInfo = def.getRight().getAttributes().get(aliasAttrName);

        // skip if alias key use complex attribute as a key
        // skip if alias attribute is not unique
        if (attrInfo.isComplex() || !attrInfo.isUnique()) {
            return;
        }

        Date asOf = ctx.getValidFrom() == null ? ctx.getValidTo() : ctx.getValidFrom();
        SearchRequestContext sCtx = SearchRequestContext.builder(EntityIndexType.RECORD, def.getRight().getName(), SecurityUtils.getCurrentUserStorageId())
                .query(SearchQuery.formQuery(
                        FieldsGroup.and(FormField.exact(attrInfo.getIndexed(), referenceResolver.getValue()))))
                .returnFields(Collections.singletonList(RecordHeaderField.FIELD_ETALON_ID.getName()))
                .filter(SearchQuery.formQuery(
                        FieldsGroup.and(
                                FormField.exact(RecordHeaderField.FIELD_DELETED, Boolean.FALSE),
                                FormField.range(RecordHeaderField.FIELD_FROM, null, asOf),
                                FormField.range(RecordHeaderField.FIELD_TO, asOf, null))))
                .count(10)
                .page(0)
                .build();

        SearchResultDTO result = searchService.search(sCtx);

        String etalonId = result.getHits().stream()
                .map(hit -> hit.getFieldValue(RecordHeaderField.FIELD_ETALON_ID.getName()))
                .filter(Objects::nonNull)
                .filter(SearchResultHitFieldDTO::isNonNullField)
                .filter(SearchResultHitFieldDTO::isSingleValue)
                .map(field-> field.getFirstValue().toString())
                .findAny()
                .orElse(null);

        if (etalonId == null) {
            // Considered supplementary. Just warn and continue.
            LOGGER.warn("Relation reference didn't resolved by reference alias key {}.", referenceResolver);
            return;
        }

        RecordKeys keys = commonRecordsComponent.identify(RecordEtalonKey.builder().id(etalonId).build());
        if (keys == null) {
            // Considered supplementary. Just warn and continue.
            LOGGER.warn("Relation reference didn't resolved by reference alias key {}.", referenceResolver);
            return;
        }

        ctx.keys(keys);
    }
    protected void setupFieldsBeforeKeys(UpsertRelationRequestContext ctx) {

        // 1. Change set. May already be set by batch
        if (Objects.isNull(ctx.changeSet())) {
            RelationUpsertChangeSet set = new RelationUpsertChangeSet();
            set.setRelationType(ctx.relationType());
            ctx.changeSet(set);
        }

        // 2. Set record timestamp
        Date ts = ctx.localTimestamp();
        if (ts == null) {
            ts = new Date(System.currentTimeMillis());
        }

        ctx.timestamp(ts);

    }

    protected void setupContainmentBeforeKeys(UpsertRelationRequestContext ctx) {

        // At this point we don't know whether this record is new or existing one, so we rely on identity.
        // A new containment record will be inserted via from - to pair so the reltype
        // should already be resolved by connector and we can use to/outgoing to upsert the distant record.
        // But if this is a relation identity - postpone the step until the keys have been resolved.
        if (ctx.relationType() != RelationType.CONTAINS || ctx.isValidRelationKey()) {
            return;
        }

        upsert(ctx);
    }

    protected void setupTimeline(UpsertRelationRequestContext ctx) {

        MeasurementPoint.start();
        try {

            RelationKeys keys = ctx.relationKeys();
            boolean batchInsert = ctx.isBatchOperation()
                    && Objects.nonNull(keys)
                    && keys.getOriginKey() != null
                    && keys.getOriginKey().getRevision() == 0;

            // In cases, other then batch insert try to load current timeline
            if (!batchInsert) {

                Timeline<OriginRelation> current = commonRelationsComponent.ensureAndGetRelationTimeline(ctx);

                ctx.currentTimeline(current);
                ctx.relationKeys(current.getKeys());

                keys = current.getKeys();
                if (Objects.nonNull(keys)) {

                    ctx.relationName(keys.getRelationName());
                    ctx.relationType(keys.getRelationType());
                    ctx.keys(keys.getToAsRecordKeys());
                }

            } else {
                Timeline<OriginRelation> current = new RelationTimeline(keys);
                ctx.currentTimeline(current);
            }

        } finally {
            MeasurementPoint.stop();
        }
    }

    protected void setupVerify(UpsertRelationRequestContext ctx) {

        RelationKeys relationKeys = ctx.relationKeys();

        // Both keys must be already resolved. Check for presence
        RecordKeys from = ctx.fromKeys();
        RecordKeys to = ctx.keys();

        boolean publishingHint = BooleanUtils.isTrue(ctx.getHint(UpsertRelationHint.HINT_PUBLISHING));

        // 1. Fail upsert. Etalon / LSN supplied for identity, but the rel couldn't be found
        if (relationKeys == null && ctx.isValidRelationKey()) {
            final String message = "Upsert relation received invalid input. Relation not found by etalon key [{}] | LSN [{}:{}].";
            LOGGER.warn(message, ctx.getRelationEtalonKey(), ctx.getShard(), ctx.getLsn());
            throw new DataProcessingException(message, DataExceptionIds.EX_DATA_RELATIONS_UPSERT_INVALID_INPUT);
        }

        // 2. Fail upsert. Incomplete identity.
        if (relationKeys == null && from == null) {
            final String message = "Cannot identify relation's from side record by given origin id [{}], external id [{}, {}, {}], etalon id [{}].";
            LOGGER.warn(message, ctx.getOriginKey(), ctx.getExternalId(), ctx.getSourceSystem(), ctx.getEntityName(), ctx.getEtalonKey());
            throw new PlatformBusinessException(message, DataExceptionIds.EX_DATA_RELATIONS_UPSERT_FROM_NOT_FOUND,
                    ctx.getOriginKey(), ctx.getExternalId(), ctx.getSourceSystem(), ctx.getEntityName(), ctx.getEtalonKey());
        }

        // 3. Fail upsert. Incomplete identity.
        if (relationKeys == null && to == null) {
            final String message = "Cannot identify relation's to side record by given origin id [{}], external id [{}, {}, {}], etalon id [{}].";
            LOGGER.warn(message, ctx.getOriginKey(), ctx.getExternalId(), ctx.getSourceSystem(), ctx.getEntityName(), ctx.getEtalonKey());
            throw new PlatformBusinessException(message, DataExceptionIds.EX_DATA_RELATIONS_UPSERT_TO_NOT_FOUND,
                    ctx.getOriginKey(), ctx.getExternalId(), ctx.getSourceSystem(), ctx.getEntityName(), ctx.getEtalonKey());
        }

        // 4. If contains 'to side' is defined, keys must be resolved (not new reference)
        if (ctx.relationType() == RelationType.CONTAINS
        && (ctx.isEtalonRecordKey() || ctx.isLsnKey()) && relationKeys == null && !publishingHint) {
            final String message = "Containment record upsert to '{}' failed.";
            LOGGER.warn(message, to.getEntityName());
            throw new DataProcessingException(message, DataExceptionIds.EX_DATA_RELATIONS_UPSERT_CONTAINS_KEYS_INVALID, to.getEntityName());
        }

        // 5. Check sides status
        RecordStatus fromStatus;
        RecordStatus toStatus;
        if (Objects.nonNull(relationKeys)) {
            fromStatus = relationKeys.getEtalonKey().getFrom().getStatus();
            toStatus = relationKeys.getEtalonKey().getTo().getStatus();
        } else {
            fromStatus = from.getEtalonKey().getStatus();
            toStatus = to.getEtalonKey().getStatus();
        }

        if ((fromStatus != RecordStatus.ACTIVE || toStatus != RecordStatus.ACTIVE) && !publishingHint) {
            final String message = "Left or right side of the relation is inactive.";
            LOGGER.warn(message);
            throw new PlatformBusinessException(message, DataExceptionIds.EX_DATA_RELATIONS_UPSERT_SIDES_INACTIVE);
        }
    }

    protected void setupKeys(UpsertRelationRequestContext ctx) {

        RelationKeys keys = ctx.relationKeys();

        boolean hasEtalonRecord = keys != null && keys.getEtalonKey() != null && keys.getEtalonKey().getId() != null;
        boolean hasOriginRecord = keys != null && keys.getOriginKey() != null && keys.getOriginKey().getId() != null;

        if (hasEtalonRecord && hasOriginRecord) {
            return;
        }

        // Both keys must be already resolved. Check for presence
        RecordKeys from = ctx.fromKeys();
        RecordKeys to = ctx.keys();

        // Save to this set
        RelationUpsertChangeSet set = ctx.changeSet();

        // 1. Handle a possibly new object
        if (!hasEtalonRecord) {
            keys = generateWithEtalonKey(ctx, from, to, set);
        }

        // 2. Create new relation origin, if needed
        if (!hasOriginRecord) {
            keys = updateWithOriginKey(ctx, from, to, set, keys, hasEtalonRecord);
        }

        // New records initialized to empty timeline, holding no keys.
        // This causes problems. Update empty timeline with new keys.
        if (keys.isNew()) {
            Timeline<OriginRelation> t = ctx.currentTimeline();
            t.setKeys(keys);
        }

        ctx.relationKeys(keys);
    }

    protected void setupStatus(UpsertRelationRequestContext ctx) {

        RelationKeys keys = ctx.relationKeys();
        RelationUpsertChangeSet set = ctx.changeSet();

        // 3. Check etalon status, re-enable, if inactive
        if (keys.getEtalonKey().getStatus() == RecordStatus.INACTIVE) {

            RelationEtalonPO po = RecordFactoryUtils.newRelationEtalonPO(ctx, RecordStatus.ACTIVE);
            po.setId(keys.getEtalonKey().getId());
            set.getEtalonRelationUpdatePOs().add(po);

            keys = RelationKeys.builder(keys)
                    .etalonKey(RelationEtalonKey.builder(keys.getEtalonKey())
                            .status(RecordStatus.ACTIVE)
                            .build())
                    .build();
        }

        // 4. Check origin status, re-enable, if inactive
        if (keys.getOriginKey().getStatus() == RecordStatus.INACTIVE) {

            RelationOriginPO origin = RecordFactoryUtils.newRelationOriginPO(ctx,
                    keys,
                    keys.getOriginKey().getFrom(),
                    keys.getOriginKey().getTo(),
                    RecordStatus.ACTIVE);

            origin.setId(keys.getOriginKey().getId());
            set.getOriginRelationUpdatePOs().add(origin);

            keys = RelationKeys.builder(keys)
                    .originKey(RelationOriginKey.builder(keys.getOriginKey())
                            .status(RecordStatus.ACTIVE)
                            .build())
                    .build();
        }

        ctx.relationKeys(keys);
    }

    protected void setupFieldsAfterKeys(UpsertRelationRequestContext ctx) {

        RelationKeys keys = ctx.relationKeys();

        // Action
        UpsertAction action = keys.isNew() ? UpsertAction.INSERT : UpsertAction.UPDATE;
        ctx.upsertAction(action);
    }

    protected void setupContainmentAfterKeys(UpsertRelationRequestContext ctx) {

        // This is a relation identity - the keys have been resolved so do the actual upsert.
        if (ctx.relationType() != RelationType.CONTAINS || !ctx.isValidRelationKey()) {
            return;
        }

        upsert(ctx);
    }

    private RelationKeys generateWithEtalonKey(UpsertRelationRequestContext ctx, RecordKeys from, RecordKeys to, RelationUpsertChangeSet set) {

        Date ts = ctx.timestamp();
        String user = SecurityUtils.getCurrentUserName();

        // 1.4 New relation etalon
        RelationEtalonPO etalon = RecordFactoryUtils.newRelationEtalonPO(ctx, RecordStatus.ACTIVE);
        set.setEtalonRelationInsertPO(etalon);

        return RelationKeys.builder()
                .relationName(ctx.relationName())
                .relationType(ctx.relationType())
                .fromEntityName(from.getEntityName())
                .toEntityName(to.getEntityName())
                .shard(etalon.getShard())
                .node(StorageUtils.node(etalon.getShard()))
                .etalonKey(RelationEtalonKey.builder()
                        .from(from.getEtalonKey())
                        .to(to.getEtalonKey())
                        .id(etalon.getId())
                        .status(etalon.getStatus())
                        .build())
                .createDate(ts)
                .updateDate(ts)
                .createdBy(user)
                .updatedBy(user)
                .build();
    }

    private RelationKeys updateWithOriginKey(UpsertRelationRequestContext ctx, RecordKeys from,
            RecordKeys to, RelationUpsertChangeSet set, RelationKeys keys, boolean hasEtalonRecord) {

        RecordOriginKey fromSysKey = null;
        RecordOriginKey toSysKey = null;

        RelationOriginPO system = null;
        RelationOriginPO origin
            = RecordFactoryUtils.newRelationOriginPO(ctx, keys,
                    from.getOriginKey(), to.getOriginKey(), RecordStatus.ACTIVE);

        String adminSourceSystem = metaModelService.instance(Descriptors.SOURCE_SYSTEMS).getAdminElement().getName();
        if (!hasEtalonRecord && !adminSourceSystem.equals(origin.getSourceSystem())) {

            fromSysKey = from.findBySourceSystemWithoutEnrichments(adminSourceSystem);
            toSysKey = to.findBySourceSystemWithoutEnrichments(adminSourceSystem);

            Objects.requireNonNull(fromSysKey, "Relation's 'from' system origin key cannot be null.");
            Objects.requireNonNull(toSysKey, "Relation's 'to' system origin key cannot be null.");

            system = RecordFactoryUtils.newRelationOriginPO(ctx,
                        keys,
                        fromSysKey,
                        toSysKey,
                        RecordStatus.ACTIVE);
        }

        RelationExternalKeyPO ext = new RelationExternalKeyPO();
        ext.setFromShard(from.getShard());
        ext.setToShard(to.getShard());
        ext.setFromRecordEtalonId(UUID.fromString(from.getEtalonKey().getId()));
        ext.setToRecordEtalonId(UUID.fromString(to.getEtalonKey().getId()));
        ext.setRelationName(ctx.relationName());
        ext.setRelationEtalonId(UUID.fromString(keys.getEtalonKey().getId()));

        set.getOriginRelationInsertPOs().addAll(system == null ? Collections.singleton(origin) : Arrays.asList(origin, system));
        set.getExternalKeyInsertPOs().add(ext);

        RelationOriginKey rok = RelationOriginKey.builder()
                .from(from.getOriginKey())
                .to(to.getOriginKey())
                .id(origin.getId())
                .initialOwner(origin.getInitialOwner())
                .revision(0)
                .status(origin.getStatus())
                .sourceSystem(origin.getSourceSystem())
                .build();

        RelationOriginKey sok = system == null
                ? null
                : RelationOriginKey.builder()
                    .from(fromSysKey)
                    .to(toSysKey)
                    .id(system.getId())
                    .initialOwner(system.getInitialOwner())
                    .revision(0)
                    .status(system.getStatus())
                    .sourceSystem(system.getSourceSystem())
                    .build();

        // New origin record, Batch will increment revisions using its own procedure.
        // For all the other puposes 1 should be used.
        keys = RelationKeys.builder(keys)
                .originKey(rok)
                .supplementaryKeys(sok == null ? Collections.singleton(rok) : Arrays.asList(rok, sok))
                .build();

        return keys;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RelationDraftProviderComponent draftProviderComponent() {
        return relationDraftProviderComponent;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public CommonRelationsComponent commonRelationsComponent() {
        return commonRelationsComponent;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public DataRecordsService dataRecordsService() {
        return dataRecordsService;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public DraftService draftService() {
        return draftService;
    }
}
